package com.gitee.jastee.kafka.group;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import com.gitee.jastee.kafka.config.Constant;

import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;

/**
 * @author jast
 * @date 2020/4/26 15:23
 */
public class SetConsumerGroupOffset {

    /**
     * Kafka消费者配置
     * @param brokerList
     * @param groupID
     * @return
     */
    private static Properties getProperties(String brokerList, String groupID) {
        Properties consumerProperties = new Properties();
        consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupID);
        consumerProperties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);//控制每次poll的数量
        consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        return consumerProperties;
    }

    public static void main(String[] args) throws InterruptedException {

        String brokerList = Constant.KAFKA_BROKERS;
        String groupID = "behaviorTopic";
        String topic = "ruleResult";  // 要重设位移的 Kafka 主题

        //获取当前offset
        getOffset(brokerList,topic,groupID);

        //设置为最早的offset
        //setEarliest(brokerList,topic,groupID);

        //设置为最新的offset
        //setLatest(brokerList,topic,groupID);

        //指定offset位置
        //setSpecifiedOffset(brokerList,topic,groupID);

        //向前跳xxoffset
        //setShiftByN(brokerList,topic,groupID,-10);


    }

    /**
     * 将当前位移调整为 +N 处，N为负数则是向前调整
     * @param brokerList
     * @param topic
     * @param groupID
     */
    public static void setShiftByN(String brokerList , String topic,String groupID,long step) {
        Properties consumerProperties = getProperties(brokerList, groupID);


        //Shift-By-N 策略
        try (final KafkaConsumer<String, String> consumer =
                     new KafkaConsumer<>(consumerProperties)) {
            print(topic, consumer, "修改之前信息，topic:");

            for (PartitionInfo info : consumer.partitionsFor(topic)) {
                consumer.subscribe(Collections.singleton(topic));
                consumer.poll(0);
                TopicPartition tp = new TopicPartition(topic, info.partition());
                long targetOffset = consumer.committed(tp).offset() + step;
                consumer.seek(tp, targetOffset);
            }
            consumer.commitSync();
            print(topic, consumer, "修改之后信息，topic:");
        }
    }



    /**
     * 指定offset
     * @param brokerList
     * @param topic
     * @param groupID
     */
    public static void setSpecifiedOffset(String brokerList , String topic,String groupID){
        Properties consumerProperties = getProperties(brokerList, groupID);

        //Specified-Offset 策略
        try (final KafkaConsumer<String, String> consumer =
                     new KafkaConsumer<>(consumerProperties)) {
            consumer.subscribe(Collections.singleton(topic));
            consumer.poll(0);

            //打印修改之后的offset信息
            print(topic, consumer, "修改之前信息，topic:");

            //如果该值小于最早offset，则设置成最早的offset，即earliest
            long targetOffset = 3087777L;
            for (PartitionInfo info : consumer.partitionsFor(topic)) {
                TopicPartition tp = new TopicPartition(topic, info.partition());
                consumer.seek(tp, targetOffset);
            }
            consumer.commitSync();

            //打印修改之后的offset信息
            print(topic, consumer, "修改之后信息，topic:");
        }
    }

    /**
     * 设置成当前消费
     * 什么意思呢？
     *   比如程序消费100条，消费到50条报异常了，我们想重新消费，就可以执行该方法，恢复到之前的offset
     *   要和业务程序在一起应用，单独使用无意义
     */
    public static void setCurrent(String brokerList , String topic,String groupID){

        Properties consumerProperties = getProperties(brokerList, groupID);

        try (final KafkaConsumer<String, String> consumer =
                     new KafkaConsumer<>(consumerProperties)) {
            consumer.subscribe(Collections.singleton(topic));
            consumer.poll(0);

            //这里是直接将offset设置成当前消费的offset

            consumer.partitionsFor(topic).stream().map(info ->
                    new TopicPartition(topic, info.partition()))
                    .forEach(tp -> {
                        long committedOffset = consumer.committed(tp).offset();
                        consumer.seek(tp, committedOffset);
                    });

        }
    }

    /**
     * 获取当前offset
     * @param brokerList
     * @param topic
     * @param groupID
     */
    public static void getOffset(String brokerList , String topic,String groupID){
        Properties consumerProperties = getProperties(brokerList, groupID);

        try (final KafkaConsumer<String, String> consumer =
                     new KafkaConsumer<>(consumerProperties)) {
            consumer.subscribe(Collections.singleton(topic));
            print(topic, consumer, "当前信息，topic:");

            Map<TopicPartition, Long> topicPartitionLongMap = consumer.endOffsets(consumer.partitionsFor(topic).stream().map(partitionInfo ->
                    new TopicPartition(topic, partitionInfo.partition()))
                    .collect(Collectors.toList()));
            topicPartitionLongMap.forEach((x,y)-> System.out.println("最新Partition:"+x.partition()+",offset:"+y));


            Map<TopicPartition, Long> topicPartitionLongBeginMap = consumer.beginningOffsets(consumer.partitionsFor(topic).stream().map(partitionInfo ->
                    new TopicPartition(topic, partitionInfo.partition()))
                    .collect(Collectors.toList()));
            topicPartitionLongBeginMap.forEach((x,y)-> System.out.println("最早Partition:"+x.partition()+",offset:"+y));

        }
    }

    /**
     * 设置到最新offset,修改的时候需要停止其他消费程序，否则其他消费程序
     */
    public static void setLatest(String brokerList , String topic,String groupID) {
        Properties consumerProperties = getProperties(brokerList, groupID);

        try (final KafkaConsumer<String, String> consumer =
                     new KafkaConsumer<>(consumerProperties)) {
            consumer.subscribe(Collections.singleton(topic));
            ConsumerRecords<String, String> poll = consumer.poll(0);

            print(topic, consumer, "修改之前信息，topic:");

            System.out.println("------------------------------------------");
            //-----------------------------方法1----------------------------
            //设置为最新消费，此时在这里使用consumer 继续消费的话就是从最早消费,在运行程序中嵌入使用
            //这里指定之后并不会提交offset，继续消费之后，才可以提交offset，一般在程序中直接使用
//            consumer.seekToEnd(
//                    consumer.partitionsFor(topic).stream().map(partitionInfo ->
//                            new TopicPartition(topic, partitionInfo.partition()))
//                            .collect(Collectors.toList()));

            //-----------------------------方法2----------------------------
            //这里是直接设置为最新的offset
            //获取最新offset
            Map<TopicPartition, Long> topicPartitionLongMap = consumer.endOffsets(consumer.partitionsFor(topic).stream().map(partitionInfo ->
                    new TopicPartition(topic, partitionInfo.partition()))
                    .collect(Collectors.toList()));

            //这种方法提交如果有其他消费程序在消费会报异常：java.lang.IllegalStateException: No current assignment for partition test-3
            topicPartitionLongMap.forEach((x,y)->consumer.seek(x,y));
            consumer.commitSync();

            //-----------------------------方法3----------------------------
            /**
             //也可以使用这种方法提交，但是这种方法存在修改不成功问题
             //比如你修改的时候，其他消费也在消费，但是他不知道，不会给你任何提示，直接commit；
             //当其他程序提交自己的offset时，会覆盖你修改的offset
             Map<TopicPartition, OffsetAndMetadata>  topicPartitionOffsetAndMetadataMap = new HashMap<>();
             for (TopicPartition topicPartition : topicPartitionLongMap.keySet()) {
             topicPartitionOffsetAndMetadataMap.put(topicPartition,new OffsetAndMetadata(topicPartitionLongMap.get(topicPartition)));
             }
             consumer.commitSync(topicPartitionOffsetAndMetadataMap);
             */

            //打印修改之后的offset信息
            print(topic, consumer, "修改之后信息，topic:");
        }
    }

    /**
     * 打印offset信息
     * @param topic
     * @param consumer
     * @param s
     */
    private static void print(String topic, KafkaConsumer<String, String> consumer, String s) {

        consumer.partitionsFor(topic).stream().forEach(partitionInfo -> {
            System.out.println(s + partitionInfo.partition() + "\toffset:" + consumer.committed(new TopicPartition(topic, partitionInfo.partition())).offset());
        });
    }

    /**
     * Earliest 设置最早消费
     */
    public static void setEarliest(String brokerList , String topic, String groupID) {
        Properties consumerProperties = getProperties(brokerList, groupID);

        try (final KafkaConsumer<String, String> consumer =
                     new KafkaConsumer<>(consumerProperties)) {
            consumer.subscribe(Collections.singleton(topic));
            ConsumerRecords<String, String> poll = consumer.poll(0);

            //打印修改之后的offset信息
            print(topic, consumer, "修改之前信息，topic:");

            System.out.println("------------------------------------------");

            //-----------------------------方法1----------------------------
            //设置为最早消费，此时在这里使用consumer 继续消费的话就是从最早消费
            //这里指定之后并不会提交offset，继续消费之后，才可以提交offset，一般在程序中直接使用
//            consumer.seekToBeginning(
//                    consumer.partitionsFor(topic).stream().map(partitionInfo ->
//                            new TopicPartition(topic, partitionInfo.partition())
//                    ).collect(Collectors.toList()));


            //-----------------------------方法2----------------------------
            //这里是直接设置为最早的offset
            Map<TopicPartition, Long> topicPartitionLongMap = consumer.beginningOffsets(consumer.partitionsFor(topic).stream().map(partitionInfo ->
                    new TopicPartition(topic, partitionInfo.partition()))
                    .collect(Collectors.toList()));

            //这种方法提交如果有其他消费程序在消费会报异常：java.lang.IllegalStateException: No current assignment for partition test-3
            topicPartitionLongMap.forEach((x,y)->consumer.seek(x,y));
            consumer.commitSync();

            //-----------------------------方法3----------------------------
            /**
             //也可以使用这种方法提交，但是这种方法存在修改不成功问题
             //比如你修改的时候，其他消费也在消费，但是他不知道，不会给你任何提示，直接commit；
             //当其他程序提交自己的offset时，会覆盖你修改的offset
             Map<TopicPartition, OffsetAndMetadata>  topicPartitionOffsetAndMetadataMap = new HashMap<>();
             for (TopicPartition topicPartition : topicPartitionLongMap.keySet()) {
             topicPartitionOffsetAndMetadataMap.put(topicPartition,new OffsetAndMetadata(topicPartitionLongMap.get(topicPartition)));
             }
             consumer.commitSync(topicPartitionOffsetAndMetadataMap);
             */



            //打印修改之后的offset信息
            print(topic, consumer, "修改之后信息，topic:");
        }
    }
}
